Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Migrate Asn1EncodedDataRouter to use Spring Kafka #138

Draft
wants to merge 131 commits into
base: dev
Choose a base branch
from

Conversation

mcook42
Copy link

@mcook42 mcook42 commented Dec 24, 2024

PR Details

Description

  • Migrated Asn1EncodedDataRouter to use Spring Kafka instead of homegrown Kafka implementation
  • Changed from swallowing all exceptions to explicitly handling the non-critical exceptions (where we can reasonably continue to process the data) and throwing specific exceptions where we cannot continue processing (where missing data would cause unexpected downstream issues)
  • Removed duplicated logging wherever EventLogger was used
  • Moved OdeTimJsonTopology to a Spring Configuration class to enable dependency injection for testing and Asn1DecodedDataRouter listener Bean initialization
  • Extracted interactions with the security services module into SecurityServicesClient and defined the request and response models based on the raw JSON manipulation done prior
  • Deleted Asn1CommandManager and moved logic to the Asn1EncodedDataRouter for clearer code and fewer classes to maintain
  • Moved class creation out of Asn1EncodedDataRouter into the relevant Beans to enable easier testing with Spring Dependency Injection
  • Replaced outdated (and buggy) usage of the Date class with usages of LocalDateTime (only in Asn1EncodedDataRouter) to allow for explicit usage of UTC timezone instead of relying on the system settings (they aren't consistent between local and CI, so it's reasonable to suspect there is inconsistency between hosting environments)

Related Issue

No related USDOT issue

Motivation and Context

Implementing Spring Kafka gives us better lifecycle management of producers and consumers, more reusable producer/consumer code, easier testability, and a more robust production-ready Kafka library. This is part of a more significant effort to replace our hand-rolled Kafka implementation with Spring Kafka. The previous changesets related to this effort are:

How Has This Been Tested?

  • Before making any code changes, I refactored the Asn1EncodedDataRouter class to check the output to the different topics is as expected.
  • Refactored Asn1EncodedDataRouter class to use Spring Kafka and confirmed the tests continued to behave as expected
  • I sent the data from the udpsender_[msgType].py scripts found under scripts/tests through a live local system started up with make rebuild. I confirmed there were no errors in the logs. I also confirmed that all expected messages were produced to and consumed from the correct queues by using the kafka-ui container available at localhost:8001 (on my local machine, of course).

For the live testing, I set the following environment variables on my system to enable signing and confirm all pieces played well together

DATA_SIGNING_ENABLED_RSU=true
DATA_SIGNING_ENABLED_SDW=true

To test the signing flow, I produced the test XML to the topic.Asn1EncoderOutput topic via the kafka-ui.

Snippet from the locals during local testing:

2025-01-06 21:57:56 [Asn1EncodedDataRouter-0-C-1] DEBUG Asn1EncodedDataRouter - ServiceRequest: {"ode":{"verb":"POST","version":"3"},"sdw":{"recordId":"6B573067","serviceRegion":{"nwCorner":{"latitude":"38.98721843900006","longitude":"-104.76767069499999"},"seCorner":{"latitude":"38.96666515900006","longitude":"-104.74048299899994"}},"ttl":"oneday"}} 
2025-01-06 21:57:56 [Asn1EncodedDataRouter-0-C-1] DEBUG Asn1EncodedDataRouter - Mapped to object ServiceRequest: {"ode":{"version":3,"verb":"POST"},"sdw":{"serviceRegion":{"nwCorner":{"latitude":38.98721843900006,"longitude":-104.76767069499999},"seCorner":{"latitude":38.96666515900006,"longitude":-104.74048299899994}},"ttl":"oneday","recordId":"6B573067"}} 
2025-01-06 21:57:56 [Asn1EncodedDataRouter-0-C-1] INFO  Asn1EncodedDataRouter - Processing unsigned message. 
2025-01-06 21:57:56 [Asn1EncodedDataRouter-0-C-1] DEBUG Asn1EncodedDataRouter - Signing encoded TIM message... 
2025-01-06 21:57:56 [Asn1EncodedDataRouter-0-C-1] INFO  SecurityServicesClient - Sending data to security services module at http://172.21.103.243:8090/sign with validity override 17820000 to be signed 
2025-01-06 21:57:56 [Asn1EncodedDataRouter-0-C-1] DEBUG SecurityServicesClient - Data to be signed: <SignatureRequestModel(message=AB+AhHARTzcDsCfPIHEyjhIPd12bAwHCZw83QWa8MCf/+T9AvmKBKaAH+Tk34c9ayY36cGpfbQmrdmwbN4AAAAAJnDzdBZrwwITiMCAAhEAzYRQHQBczzcLWID/aF1Y5g1jtJ1V8aFb7qsHDoPjZwiHdhYpg4K2llhJFZ/KxggAQAgCe7rs2AA==, sigValidityOverride=17820000),[Content-Type:"application/json"]> 
2025-01-06 21:58:00 [Asn1EncodedDataRouter-0-C-1] DEBUG SecurityServicesClient - Security services module response: <200 OK OK,SignatureResultModel(messageSigned=AB+AhHARTzcDsCfPIHEyjhIPd12bAwHCZw83QWa8MCf/+T9AvmKBKaAH+Tk34c9ayY36cGpfbQmrdmwbN4AAAAAJnDzdBZrwwITiMCAAhEAzYRQHQBczzcLWID/aF1Y5g1jtJ1V8aFb7qsHDoPjZwiHdhYpg4K2llhJFZ/KxggAQAgCe7rs2AA==c26d50a4, messageExpiry=17820000),[Content-Type:"application/json", Transfer-Encoding:"chunked", Date:"Mon, 06 Jan 2025 21:58:00 GMT", Keep-Alive:"timeout=60", Connection:"keep-alive"]> 
2025-01-06 21:58:00 [Asn1EncodedDataRouter-0-C-1] DEBUG Asn1EncodedDataRouter - Encoded message - phase 1: 001F808470114F3703B027CF2071328E120F775D9B0301C2670F374166BC3027FFF93F40BE628129A007F93937E1CF5AC98DFA706A5F6D09AB766C1B3780000000099C3CDD059AF0C084E2302000844033611407401733CDC2D6203FDA1756398358ED27557C6856FBAAC1C3A0F8D9C221DD858A60E0ADA596124567F2B182001002009EEEBB3600736E9DE746B8 
2025-01-06 21:58:00 [Asn1EncodedDataRouter-0-C-1] DEBUG Asn1EncodedDataRouter - No RSUs or SNMP provided. Not sending to RSUs. 
2025-01-06 21:58:00 [kafka-producer-network-thread | producer-5] DEBUG LoggingProducerListener - Successfully produced to topic topic.OdeTIMCertExpirationTimeJson with key null and value {"packetID":"03B027CF2071328E12","startDateTime":"2024-03-08T16:37:05.414Z","requiredExpirationDate":"2024-03-08T21:34:05.414Z","expirationDate":"1970-07-26T06:00:00.000Z"}  
2025-01-06 21:58:00 [Asn1EncodedDataRouter-0-C-1] DEBUG Asn1EncodedDataRouter - TIM not found in k-table. Skipping deposit to TMC-filtered topic. 
2025-01-06 21:58:00 [Asn1EncodedDataRouter-0-C-1] DEBUG Asn1EncodedDataRouter - Publishing message for round 2 encoding 
2025-01-06 21:58:00 [Asn1EncodedDataRouter-0-C-1] DEBUG Asn1EncodedDataRouter - Fully crafted ASD to be encoded: <OdeAsn1Data><metadata><payloadType>us.dot.its.jpo.ode.model.OdeAsdPayload</payloadType><serialId><streamId>05ce8f1c-9c30-4ba1-afb3-5cac3d681ce9</streamId><bundleSize>1</bundleSize><bundleId>0</bundleId><recordId>0</recordId><serialNumber>0</serialNumber></serialId><odeReceivedAt>2025-01-06T21:58:00.453Z</odeReceivedAt><schemaVersion>8</schemaVersion><maxDurationTime>0</maxDurationTime><sanitized>false</sanitized><request><ode><version>3</version><verb>POST</verb></ode><sdw><serviceRegion><nwCorner><latitude>38.98721843900006</latitude><longitude>-104.76767069499999</longitude></nwCorner><seCorner><latitude>38.96666515900006</latitude><longitude>-104.74048299899994</longitude></seCorner></serviceRegion><ttl>oneday</ttl><recordId>6B573067</recordId></sdw></request><encodings><encodings><elementName>AdvisorySituationData</elementName><elementType>AdvisorySituationData</elementType><encodingRule>UPER</encodingRule></encodings></encodings></metadata><payload><dataType>us.dot.its.jpo.ode.plugin.j2735.DdsAdvisorySituationData</dataType><data><AdvisorySituationData><dialogID>156</dialogID><seqID>5</seqID><groupID>00000000</groupID><requestID>0CDD17A4</requestID><recordID>6B573067</recordID><timeToLive>2</timeToLive><serviceRegion><nwCorner><lat>389872184</lat><long>-1047676707</long></nwCorner><seCorner><lat>389666652</lat><long>-1047404830</long></seCorner></serviceRegion><asdmDetails><asdmID>0CDD17A4</asdmID><asdmType>2</asdmType><distType>02</distType><startTime><year>0</year><month>0</month><day>0</day><hour>31</hour><minute>60</minute></startTime><stopTime><year>0</year><month>0</month><day>0</day><hour>31</hour><minute>60</minute></stopTime><advisoryMessage>001F808470114F3703B027CF2071328E120F775D9B0301C2670F374166BC3027FFF93F40BE628129A007F93937E1CF5AC98DFA706A5F6D09AB766C1B3780000000099C3CDD059AF0C084E2302000844033611407401733CDC2D6203FDA1756398358ED27557C6856FBAAC1C3A0F8D9C221DD858A60E0ADA596124567F2B182001002009EEEBB3600736E9DE746B8</advisoryMessage></asdmDetails></AdvisorySituationData></data></payload></OdeAsn1Data> 
2025-01-06 21:58:00 [kafka-producer-network-thread | producer-5] DEBUG LoggingProducerListener - Successfully produced to topic topic.Asn1EncoderInput with key null and value <OdeAsn1Data><metadata><payloadType>us.dot.its.jpo.ode.model.OdeAsdPayload</payloadType><serialId><streamId>05ce8f1c-9c30-4ba1-afb3-5cac3d681ce9</streamId><bundleSize>1</bundleSize><bundleId>0</bundleId><recordId>0</recordId><serialNumber>0</serialNumber></serialId><odeReceivedAt>2025-01-06T21:58:00.453Z</odeReceivedAt><schemaVersion>8</schemaVersion><maxDurationTime>0</maxDurationTime><sanitized>false</sanitized><request><ode><version>3</version><verb>POST</verb></ode><sdw><serviceRegion><nwCorner><latitude>38.98721843900006</latitude><longitude>-104.76767069499999</longitude></nwCorner><seCorner><latitude>38.96666515900006</latitude><longitude>-104.74048299899994</longitude></seCorner></serviceRegion><ttl>oneday</ttl><recordId>6B573067</recordId></sdw></request><encodings><encodings><elementName>AdvisorySituationData</elementName><elementType>AdvisorySituationData</elementType><encodingRule>UPER</encodingRule></encodings></encodings></metadata><payload><dataType>us.dot.its.jpo.ode.plugin.j2735.DdsAdvisorySituationData</dataType><data><AdvisorySituationData><dialogID>156</dialogID><seqID>5</seqID><groupID>00000000</groupID><requestID>0CDD17A4</requestID><recordID>6B573067</recordID><timeToLive>2</timeToLive><serviceRegion><nwCorner><lat>389872184</lat><long>-1047676707</long></nwCorner><seCorner><lat>389666652</lat><long>-1047404830</long></seCorner></serviceRegion><asdmDetails><asdmID>0CDD17A4</asdmID><asdmType>2</asdmType><distType>02</distType><startTime><year>0</year><month>0</month><day>0</day><hour>31</hour><minute>60</minute></startTime><stopTime><year>0</year><month>0</month><day>0</day><hour>31</hour><minute>60</minute></stopTime><advisoryMessage>001F808470114F3703B027CF2071328E120F775D9B0301C2670F374166BC3027FFF93F40BE628129A007F93937E1CF5AC98DFA706A5F6D09AB766C1B3780000000099C3CDD059AF0C084E2302000844033611407401733CDC2D6203FDA1756398358ED27557C6856FBAAC1C3A0F8D9C221DD858A60E0ADA596124567F2B182001002009EEEBB3600736E9DE746B8</advisoryMessage></asdmDetails></AdvisorySituationData></data></payload></OdeAsn1Data>  
2025-01-06 21:58:00 [Asn1EncodedDataRouter-0-C-1] DEBUG Asn1EncodedDataRouter - ServiceRequest: {"ode":{"verb":"POST","version":"3"},"sdw":{"recordId":"6B573067","serviceRegion":{"nwCorner":{"latitude":"38.98721843900006","longitude":"-104.76767069499999"},"seCorner":{"latitude":"38.96666515900006","longitude":"-104.74048299899994"}},"ttl":"oneday"}} 
2025-01-06 21:58:00 [Asn1EncodedDataRouter-0-C-1] DEBUG Asn1EncodedDataRouter - Mapped to object ServiceRequest: {"ode":{"version":3,"verb":"POST"},"sdw":{"serviceRegion":{"nwCorner":{"latitude":38.98721843900006,"longitude":-104.76767069499999},"seCorner":{"latitude":38.96666515900006,"longitude":-104.74048299899994}},"ttl":"oneday","recordId":"6B573067"}} 
2025-01-06 21:58:00 [kafka-producer-network-thread | producer-5] DEBUG LoggingProducerListener - Successfully produced to topic topic.SDWDepositorInput with key null and value {"encodedMsg":"C44000000000CDD17A46B57306742670F19C166BC56E099BD80B859B761C3866E8BD2202000007FC000007FC11C003F0108E0229E6E07604F9E40E2651C241EEEBB36060384CE1E6E82CD78604FFFF27E817CC50253400FF2726FC39EB5931BF4E0D4BEDA1356ECD8366F00000000133879BA0B35E18109C4604001088066C2280E802E679B85AC407FB42EAC7306B1DA4EAAF8D0ADF755838741F1B38443BB0B14C1C15B4B2C248ACFE56304002004013DDD766C00E6DD3BCE8D700"}  

Types of changes

  • Defect fix (non-breaking change that fixes an issue)
  • New feature (non-breaking change that adds functionality)
  • Breaking change (fix or feature that cause existing functionality to change)

Checklist:

  • I have added any new packages to the sonar-scanner.properties file
  • My change requires a change to the documentation.
  • I have updated the documentation accordingly.
  • I have read the CONTRIBUTING document.
    ODE Contributing Guide
  • I have added tests to cover my changes.
  • All new and existing tests passed.

Helpful Documentation

Generated by sending tim to deposit tim endpoint and grabbing value produced to the `topic.Asn1EncoderOutput` topic via kafka-ui
…TimUnsecured_depositsToSdxTopicAndTimTmcFiltered
…tImpl

These can be used to wrap the calls to security services and allow easier mocking of responses. It also allows us to better shield ourselves from API contract changes. users of the client don't need to be aware of changes to the external API as all interactions can be encapsulated within the client
This moves the calls to security services from Asn1CommandManager to SecurityServicesClientImpl which allows for more testability, flexibility (mocking client calls), and modularity
The responsibility to submit data to topics is already contained within Asn1EncodedDataRouter, and Asn1CommandManager is not responsible for any kafka interactions otherwise, so the responsibility was moved to Asn1EncodedDataRouter
Updated method signatures and instance usage to specify ResponseEvent with Address as the generic type. This change improves type safety and aligns with best practices for clearer code readability and maintainability.
Added `RestClientException` to `signMessage` for better error handling. Implemented a new test case to verify behavior when the signing service returns a null response.
Simplified `getServiceRequest` method by directly passing the metadata JSON object instead of the full consumed object. Updated relevant calls to align with this change, improving clarity and reducing redundant operations.
Removed verbose and redundant JavaDoc comments, replacing them with concise inline comments to improve code readability. This change ensures that key processing logic remains documented without unnecessary detail duplication.
@mcook42 mcook42 force-pushed the mcook42/spring-kafka/asn1-encoded-router branch from 0bc948a to 0d30254 Compare December 26, 2024 23:56
Removed redundant null checks before calling `sendToRsus` in the main logic. The null checks are properly handled within the `sendToRsus` method, ensuring clearer and more maintainable code.
Standardizing on the ObjectMapper provided in the app context will reduce variability and decrease risk of unintended differences in serdes behavior
This name better represents the behavior of the method. The Javadocs were updated to be accurate as well
Copy link
Author

@mcook42 mcook42 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would recommend reviewing the Asn1EncodedDataRouter code from within your IDE. There are a lot of changes in there that don't necessarily work well with GitHub's viewer. The processing flow should not have changed. The changes in that file should be in one of four categories:

  1. Deduplication of JSON processing
  2. Deduplication of message signing or encoding
  3. Using the SerializationConfig-provided Object Mapper instead of JsonUtils or TimTransmorgrifier or other JSON serdes (there were a few weird ones).
  4. Extracting logic into named methods to reduce cognitive complexity

Comment on lines -61 to +55
return streams.state().isRunningOrRebalancing();
return streams.state().equals(KafkaStreams.State.RUNNING);
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note: this method is only used in tests for now, and in the tests, a state of REBALANCING would cause an error. Addressing this possible bug may be worth the effort in a follow-up PR or separate work item. Still, I don't believe the changes belong here since I didn't change how the OdeTimJsonTopology works in this PR.

Comment on lines -169 to +180
kafkaTemplate.send(jsonTopics.getTim(), consumerRecord.key(), odeTimData);
kafkaTemplate.send(jsonTopics.getTim(), streamId, odeTimData);
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note: the key is empty when consuming from the asn1 decoder output topic. We need to include the streamId as the key when publishing TIM data so that the downstream consumers can use the streamId to lookup the TIM JSON from the K-Table in OdeTimJsonTopology. If we don't produce with a streamId (which we conditionally set a few lines above this method)) then we will never produce to the TMCFiltered topic in the Asn1EncodedDataRouter

Comment on lines -131 to -156
if (metadata.has(TimTransmogrifier.REQUEST_STRING)) {
JSONObject request = metadata.getJSONObject(TimTransmogrifier.REQUEST_STRING);
if (request.has(TimTransmogrifier.RSUS_STRING)) {
Object rsus = request.get(TimTransmogrifier.RSUS_STRING);
if (rsus instanceof JSONObject) {
JSONObject rsusIn = (JSONObject) request.get(TimTransmogrifier.RSUS_STRING);
if (rsusIn.has(TimTransmogrifier.RSUS_STRING)) {
Object rsu = rsusIn.get(TimTransmogrifier.RSUS_STRING);
JSONArray rsusOut = new JSONArray();
if (rsu instanceof JSONArray) {
log.debug("Multiple RSUs exist in the request: {}", request);
JSONArray rsusInArray = (JSONArray) rsu;
for (int i = 0; i < rsusInArray.length(); i++) {
rsusOut.put(rsusInArray.get(i));
}
request.put(TimTransmogrifier.RSUS_STRING, rsusOut);
} else if (rsu instanceof JSONObject) {
log.debug("Single RSU exists in the request: {}", request);
rsusOut.put(rsu);
request.put(TimTransmogrifier.RSUS_STRING, rsusOut);
} else {
log.debug("No RSUs exist in the request: {}", request);
request.remove(TimTransmogrifier.RSUS_STRING);
}
}
}
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note: as far as I can tell by reading the code and stepping through it with a debugger, this code block did nothing. The request variable is manipulated but never used after line 153. Please take a second look at this change. I'm fairly certain it can be safely deleted, but I may be missing something

+ consumedData;
if (log.isDebugEnabled()) {
// print error message and stack trace
EventLogger.logger.error(msg, e);
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Question: Does anyone know if there was a purpose to the EventLogger? Do we consume data from this logger differently?

Date requiredExpirationDate = new Date();
requiredExpirationDate.setTime(timTimestamp.getTime() + maxDurationTime);
timWithExpiration.put("requiredExpirationDate", dateFormat.format(requiredExpirationDate));
} catch (Exception e) {
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note: this is one of the rare times in this code where we can safely continue (as far as I can tell from the code) processing a message should an Exception be thrown. I intentionally handled the expectation here and allowed the processing to continue. I couldn't find any downstream issues in the code. Also, please note that we already handled the exception and continued processing in the previous implementation. This does not change the code flow.

Comment on lines +379 to +382
} catch (Exception e) {
log.error("Unable to get expiration date from signed messages response. Setting expirationData to 'null'", e);
timWithExpiration.put("expirationDate", "null");
}
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note: this is one of the rare times in this code where we can safely continue (as far as I can tell from the code) processing a message should an Exception be thrown. I intentionally handled the expectation here and allowed the processing to continue. I couldn't find any downstream issues in the code. Also, please note that we already handled the exception and continued processing in the previous implementation. This does not change the code flow.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note: This code is heavily tested indirectly by the Asn1EncodedDataRouter tests. The existing tests were not adding much value (I wrote most of them, so my bad), and meaningful tests were difficult to write without duplicating much of the testing already performed in the Asn1EncodedDataRouter tests.

…for Kafka container setup

Replaced ConcurrentMessageListenerContainer with KafkaMessageListenerContainer and removed KafkaConsumerConfig dependency. This fixes an issue where we would intermittently encounter the following error when running tests:

java.lang.IllegalStateException: Expected 1 but got 0 partitions
	at org.springframework.kafka.test.utils.ContainerTestUtils.waitForAssignment(ContainerTestUtils.java:85)
Updated the expiration date calculation to use `Instant` and ensure accuracy by handling milliseconds properly. Adjusted test cases and mock classes to align with the updated logic. Fixed incorrect test data to reflect the corrected expiration date format.
Replaced the outdated `SimpleDateFormat` with the modern `DateTimeFormatter` to handle date formatting and parsing. This improves thread safety and code readability while ensuring alignment with Java's modern date/time APIs. Adjusted test resources to reflect updated timestamp formatting logic.
Add exception handling to catch RestClientException when signing TIM messages. Log detailed error messages, including specific handling for HttpClientErrorException.NotFound, to provide better debugging information and highlight potential misconfiguration of jpo-security-svcs.
Convert TIM start flag to uppercase for consistency and improve logging to dynamically include the expected start flag value. This enhances clarity and ensures robust matching during header stripping.
Consolidated signing logic into `depositToTimCertExpirationTopic` and removed redundant `signTimWithExpiration` method. Added utility for obtaining hex-encoded signed messages in `SignatureResultModel` to streamline encoding operations.
If we fail to sign the message, we should fail to process the message further. We don't want any unsigned messages in our system (if signing is enabled)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

1 participant